Apache Airflow是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过Apache Airflow的Livy Operator实现自动化地向EMR Serverless Spark提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
背景信息
Apache Livy通过REST接口与Spark进行交互,极大简化了Spark和应用程序服务器之间的通信复杂度。关于Livy API,请参见REST API。
前提条件
已安装并启动Airflow服务,详情请参见Installation of Airflow。
已创建工作空间,详情请参见创建工作空间。
操作步骤
步骤一:创建Gateway及访问Token
创建并启动Gateway。
进入Gateway页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的
。
单击Livy Gateway页签。
在Livy Gateway页面,单击创建Livy Gateway。
在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建。
其余参数请根据具体情况进行调整,更多参数信息请参见管理Gateway。
在Livy Gateway页面,单击已创建Gateway操作列的启动。
创建Token。
在Gateway页面,单击Livy-gateway操作列的Token管理。
单击创建Token。
在创建Token对话框中,输入名称(例如,Livy-token),单击确定。
复制Token信息。
重要Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。
步骤二:配置Apache Airflow
执行以下命令,在Apache Airflow环境中安装Apache Livy。
pip install apache-airflow-providers-apache-livy
添加Connection。
UI方式
在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection。
涉及以下信息:
Host:填写为Gateway中的Endpoint信息。
Schema:填写为https。
Extra:填写JSON字符串,
x-acs-spark-livy-token
为您前一个步骤中复制的Token信息。{ "x-acs-spark-livy-token": "6ac**********kfu" }
CLI方式
通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection。
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # Gateway中的Endpoint信息。 "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # 为您前一个步骤中复制的Token信息。 } }'
步骤三: 使用Livy Operator提交Spark任务
Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。
从阿里云OSS获取并执行Python脚本文件。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
file
为您的Spark任务对应的文件路径,本文示例为上传至阿里云OSS上的JAR包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传。
步骤四:查看提交至EMR的任务
在EMR Serverless Spark页面,单击左侧导航栏中的任务历史。
在任务历史的开发任务页签,您可以查看提交的任务。
相关文档
在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator
接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务。